
package com.shiku.commons.thread.pool;


import java.util.concurrent.ConcurrentLinkedQueue;


public abstract class AbstractQueueRunnable<T>
        extends BaseRunnable {
    protected ConcurrentLinkedQueue<T> msgQueue = new ConcurrentLinkedQueue<>();

    protected long sleep = 1000L;


    public void setSleep(long sleep) {

        this.sleep = sleep;

    }


    protected int batchSize = 1;


    public void setBatchSize(int batchSize) {

        this.batchSize = batchSize;

    }


    public abstract void runTask();


    @Override
    public void run() {

        while (true) {

            try {

                synchronized (this.msgQueue) {

                    if (this.msgQueue.isEmpty()) {

                        this.msgQueue.wait();

                    }

                }

                this.loopCount.set(0);

                runTask();

            } catch (Exception e) {

                e.printStackTrace();

            }

        }

    }


    public boolean addMsg(T t) {

        synchronized (this.msgQueue) {

            this.msgQueue.offer(t);

            this.msgQueue.notifyAll();

        }

        return true;

    }


    public void clearMsgQueue() {

        this.msgQueue.clear();

    }


    public boolean isNeededExecute() {

        return !this.msgQueue.isEmpty();

    }

}


